package com.atguigu.app.dwd;

import com.atguigu.common.Constant;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Dwd02_InteractionCommentInfo {

    public static void main(String[] args) {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.1 开启CK
//        env.enableCheckpointing(10000L);
//        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//        checkpointConfig.setCheckpointTimeout(20000L);
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
//        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        //checkpointConfig.setCheckpointInterval(10000L);
//        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
//        checkpointConfig.setMaxConcurrentCheckpoints(2);
//        //默认是int类型的最大值
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//        env.setStateBackend(new HashMapStateBackend());
//
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.使用FlinkSQL方式获取Kafka topic_db 主题数据,注意提取处理时间
        tableEnv.executeSql(KafkaUtil.getTopicDbDDL("dwd_comment_info_230315"));

        //打印测试
        //tableEnv.sqlQuery("select * from topic_db").execute().print();

        //3.过滤评论表数据
        Table commentInfoTable = tableEnv.sqlQuery("" +
                "select\n" +
                "    `data`['id'] id,\n" +
                "    `data`['user_id'] user_id,\n" +
                "    `data`['nick_name'] nick_name,\n" +
                "    `data`['sku_id'] sku_id,\n" +
                "    `data`['spu_id'] spu_id,\n" +
                "    `data`['order_id'] order_id,\n" +
                "    `data`['appraise'] appraise,\n" +
                "    `data`['comment_txt'] comment_txt,\n" +
                "    `data`['create_time'] create_time,\n" +
                "    `data`['operate_time'] operate_time,\n" +
                "    `pt`\n" +
                "from topic_db\n" +
                "where `database`='gmall-230315-flink'\n" +
                "and `table`='comment_info'\n" +
                "and (`type`='insert' or `type`='update')");
        tableEnv.createTemporaryView("comment_info", commentInfoTable);
        //tableEnv.sqlQuery("select * from comment_info").execute().print();

        //4.从HBase读取base_dic表
        tableEnv.executeSql("" +
                "create table base_dic(\n" +
                "    dic_code string,\n" +
                "    info ROW<dic_name string>\n" +
                ") with (\n" +
                "    'connector' = 'hbase-2.2',\n" +
                "    'table-name' = 'gmall_230315:dim_base_dic',\n" +
                "    'zookeeper.quorum' = 'hadoop102:2181,hadoop103:2181,hadoop104:2181'\n" +
                ")");
        //tableEnv.sqlQuery("select * from base_dic").execute().print();

        //5.关联两张表
        Table resultTable = tableEnv.sqlQuery("" +
                "select\n" +
                "    id,\n" +
                "    user_id,\n" +
                "    nick_name,\n" +
                "    sku_id,\n" +
                "    spu_id,\n" +
                "    order_id,\n" +
                "    appraise,\n" +
                "    info.dic_name,\n" +
                "    comment_txt,\n" +
                "    create_time,\n" +
                "    operate_time\n" +
                "from comment_info\n" +
                "join base_dic FOR SYSTEM_TIME AS OF comment_info.pt\n" +
                "on comment_info.appraise = base_dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);

        //tableEnv.sqlQuery("select * from result_table").execute().print();

        //6.将数据写出到Kafka
        tableEnv.executeSql("" +
                "create table dwd_comment_info(\n" +
                "    id string,\n" +
                "    user_id string,\n" +
                "    nick_name string,\n" +
                "    sku_id string,\n" +
                "    spu_id string,\n" +
                "    order_id string,\n" +
                "    appraise string,\n" +
                "    dic_name string,\n" +
                "    comment_txt string,\n" +
                "    create_time string,\n" +
                "    operate_time string\n" +
                ")" + KafkaUtil.getKafkaSinkDDL(Constant.TOPIC_DWD_INTERACTION_COMMENT_INFO));
        tableEnv.executeSql("insert into dwd_comment_info select * from result_table");

    }

}
